/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core;



import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.Combine.CombineFn;

import com.bff.gaia.unified.sdk.transforms.CombineFnBase.GlobalCombineFn;

import com.bff.gaia.unified.sdk.transforms.CombineWithContext;

import com.bff.gaia.unified.sdk.transforms.CombineWithContext.CombineFnWithContext;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables;



import java.util.Collection;



/**

 * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different

 * combine functions.

 */

public class GlobalCombineFnRunners {

  /** Returns a {@link GlobalCombineFnRunner} from a {@link GlobalCombineFn}. */

  public static <InputT, AccumT, OutputT> GlobalCombineFnRunner<InputT, AccumT, OutputT> create(

      GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {

    if (globalCombineFn instanceof CombineFnWithContext) {

      return new CombineFnWithContextRunner<>(

          (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn);

    } else if (globalCombineFn instanceof CombineFn) {

      return new CombineFnRunner<>((CombineFn<InputT, AccumT, OutputT>) globalCombineFn);

    } else {

      throw new IllegalStateException(

          String.format("Unknown type of CombineFn: %s", globalCombineFn.getClass()));

    }

  }



  /**

   * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader}, and

   * the main input window.

   */

  private static CombineWithContext.Context createFromComponents(

      final PipelineOptions options,

      final SideInputReader sideInputReader,

      final BoundedWindow mainInputWindow) {

    return new CombineWithContext.Context() {

      @Override

      public PipelineOptions getPipelineOptions() {

        return options;

      }



      @Override

      public <T> T sideInput(PCollectionView<T> view) {

        if (!sideInputReader.contains(view)) {

          throw new IllegalArgumentException("calling sideInput() with unknown view");

        }



        BoundedWindow sideInputWindow =

            view.getWindowMappingFn().getSideInputWindow(mainInputWindow);

        return sideInputReader.get(view, sideInputWindow);

      }

    };

  }



  /**

   * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}.

   *

   * <p>It forwards functions calls to the {@link CombineFn}.

   */

  private static class CombineFnRunner<InputT, AccumT, OutputT>

      implements GlobalCombineFnRunner<InputT, AccumT, OutputT> {

    private final CombineFn<InputT, AccumT, OutputT> combineFn;



    private CombineFnRunner(CombineFn<InputT, AccumT, OutputT> combineFn) {

      this.combineFn = combineFn;

    }



    @Override

    public String toString() {

      return combineFn.toString();

    }



    @Override

    public AccumT createAccumulator(

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFn.createAccumulator();

    }



    @Override

    public AccumT addInput(

        AccumT accumulator,

        InputT input,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFn.addInput(accumulator, input);

    }



    @Override

    public AccumT mergeAccumulators(

        Iterable<AccumT> accumulators,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFn.mergeAccumulators(accumulators);

    }



    @Override

    public OutputT extractOutput(

        AccumT accumulator,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFn.extractOutput(accumulator);

    }



    @Override

    public AccumT compact(

        AccumT accumulator,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFn.compact(accumulator);

    }

  }



  /**

   * An implementation of {@link GlobalCombineFnRunner} with {@link

   * CombineFnWithContext}.

   *

   * <p>It forwards functions calls to the {@link CombineFnWithContext}.

   */

  private static class CombineFnWithContextRunner<InputT, AccumT, OutputT>

      implements GlobalCombineFnRunner<InputT, AccumT, OutputT> {

    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext;



    private CombineFnWithContextRunner(

        CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {

      this.combineFnWithContext = combineFnWithContext;

    }



    @Override

    public String toString() {

      return combineFnWithContext.toString();

    }



    @Override

    public AccumT createAccumulator(

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFnWithContext.createAccumulator(

          createFromComponents(options, sideInputReader, Iterables.getOnlyElement(windows)));

    }



    @Override

    public AccumT addInput(

        AccumT accumulator,

        InputT input,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFnWithContext.addInput(

          accumulator,

          input,

          createFromComponents(options, sideInputReader, Iterables.getOnlyElement(windows)));

    }



    @Override

    public AccumT mergeAccumulators(

        Iterable<AccumT> accumulators,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFnWithContext.mergeAccumulators(

          accumulators,

          createFromComponents(options, sideInputReader, Iterables.getOnlyElement(windows)));

    }



    @Override

    public OutputT extractOutput(

        AccumT accumulator,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFnWithContext.extractOutput(

          accumulator,

          createFromComponents(options, sideInputReader, Iterables.getOnlyElement(windows)));

    }



    @Override

    public AccumT compact(

        AccumT accumulator,

        PipelineOptions options,

        SideInputReader sideInputReader,

        Collection<? extends BoundedWindow> windows) {

      return combineFnWithContext.compact(

          accumulator,

          createFromComponents(options, sideInputReader, Iterables.getOnlyElement(windows)));

    }

  }

}